package org.voovan.korla.socket;

import org.voovan.korla.exception.KorlaException;
import org.voovan.korla.message.Callback;
import org.voovan.korla.message.Msg;
import org.voovan.tools.TEnv;
import org.voovan.tools.log.Logger;
import org.voovan.tools.pool.ObjectPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;

import static org.voovan.korla.KorlaStatic.*;
import static org.voovan.tools.TObject.nullDefault;

/**
 * 消费者池管理对象
 *
 * @author: helyho
 * korla Framework.
 * WebSite: https://github.com/helyho/korla
 * Licence: Apache v2 License
 */
public class KorlaConsumerPool {
    public enum Status {
        INIT, OPEN, CLOSING, CLOSED
    }


    private String host;
    private int port;
    private int readTimeout;
    private int sendTimeout;
    private transient ObjectPool<KorlarConsumer> objectPool;

    private Status status = Status.INIT;

    /**
     * 构造函数
     * @param channel      通道名称
     * @param host      监听地址
     * @param port		监听端口
     * @param readTimeout   超时时间, 单位: 毫秒
     * @param sendTimeout 发超时时间, 单位: 毫秒
     * @param minPoolSize 最小容量
     * @param maxPoolSize 最大容量
     */
    public KorlaConsumerPool(String channel, String host, int port, int readTimeout, int sendTimeout, int minPoolSize, int maxPoolSize) {
        this.host = host;
        this.port = port;
        this.readTimeout = readTimeout;
        this.sendTimeout = sendTimeout;

        channel = nullDefault(channel, DEFAULT_CHANNEL);

        String name = " [" + channel + "] -> " + host + ":" + port;

        Logger.simplef("KorlaConsumerManager {}, minPoolSize: {}, maxPoolSize: {}", name, minPoolSize, maxPoolSize );

        String finalChannel = channel;
        objectPool = new ObjectPool<KorlarConsumer>()
                .minSize(minPoolSize)
                .maxSize(maxPoolSize)
                //如果没有则生成
                .supplier(()-> {
                    try {
                        KorlarConsumer korlarConsumer = new KorlarConsumer(finalChannel, host, port, readTimeout, sendTimeout);
                        korlarConsumer.connect();

                        //批量提交参数
                        korlarConsumer.batch(BATCH_SIZE, BATCH_INTERVALE);
                        return korlarConsumer;
                    } catch (IOException e) {
                        Logger.error("Create KorlarConsumer " + host + ":" + port + " error:", e);
                        TEnv.sleep(CONNECT_FAILED_WAIT *1000);
                        //建立连接失败重试等待时间
                    }
                    return null;
                }).
                destory((korlarConsumer)->{
                    korlarConsumer.close();
                    return true;
                })
                //可用检查, 如果不可用,会自动取下一个
                .validator((korlarConsumer) -> {
                    return korlarConsumer.isConnect();
                })
                .create();

        this.status = Status.OPEN;
    }


    /**
     * 构造函数
     * @param channel      通道名称
     * @param host      监听地址
     * @param port		监听端口
     * @param readTimeout   超时时间, 单位: 毫秒
     * @param sendTimeout 发超时时间, 单位: 毫秒
     * @param poolSize 容量大小
     */
    public KorlaConsumerPool(String channel, String host, int port, int readTimeout, int sendTimeout, int poolSize) {
        this(channel, host, port, readTimeout, sendTimeout, poolSize, poolSize);
    }

        /**
         * 构造函数
         * @param host      监听地址
         * @param port		监听端口
         * @param readTimeout   超时时间, 单位: 毫秒
         * @param sendTimeout 发超时时间, 单位: 毫秒
         * @param minPoolSize 最小容量
         * @param maxPoolSize 最大容量
         */
    public KorlaConsumerPool(String host, int port, int readTimeout, int sendTimeout, int minPoolSize, int maxPoolSize) {
        this(null, host, port, readTimeout, sendTimeout, minPoolSize, maxPoolSize);
    }

    /**
     * 构造函数
     * @param host      监听地址
     * @param port		监听端口
     * @param readTimeout   超时时间, 单位: 毫秒
     * @param sendTimeout 发超时时间, 单位: 毫秒
     * @param poolSize 容量大小
     */
    public KorlaConsumerPool(String host, int port, int readTimeout, int sendTimeout, int poolSize) {
        this(host, port, readTimeout, sendTimeout, poolSize, poolSize);
    }

        public String getHost() {
        return host;
    }

    public int getPort() {
        return port;
    }

    public int getReadTimeout() {
        return readTimeout;
    }

    public int getSendTimeout() {
        return sendTimeout;
    }

    public int getSize() {
        return objectPool.size();
    }

    public int getAvaliableSize() {
        return objectPool.avaliableSize();
    }

    public int getMaxSize() {
        return objectPool.getMaxSize();
    }

    public int getMinSize() {
        return objectPool.getMinSize();
    }

    public void setPoolSize(int poolSize) {
        objectPool.maxSize(poolSize);
        objectPool.minSize(poolSize);
    }

    public boolean isAvaliable() {
       if(getAvaliableSize() != 0 && getSize() != 0) {
           status = Status.OPEN;
           return true;
       } else {
           return false;
       }
    }

    public Status getStatus() {
        return status;
    }

    public KorlarConsumer getConsumer() {
        KorlarConsumer korlarConsumer = objectPool.borrow();
        if(korlarConsumer == null) {
            System.out.println("get null");
        }
        return korlarConsumer;
    }

    public KorlarConsumer getConsumer(long waitTime) throws TimeoutException {
        KorlarConsumer korlarConsumer = objectPool.borrow(waitTime);
        if(korlarConsumer == null) {
            System.out.println("KorlaConsumerPool get null");
        }
        return korlarConsumer;
    }

    public void restitution(KorlarConsumer korlarConsumer) {
        objectPool.restitution(korlarConsumer);
    }

    public void close() {
        close(readTimeout);
    }

    public void close(int timeout) {
        this.status = Status.CLOSING;
        objectPool.clear();
        TEnv.wait(timeout, ()->objectPool.avaliableSize() > 0);
        this.status = Status.CLOSED;
    }
}
